package notifications

import "sync"

type operation int

const (
	subscribe operation = iota
	pub
	unsubAll
	closeTopic
	shutdown
)

type cmd struct {
	op     operation
	topics []Topic
	sub    Subscriber
	msg    Event
}

// publisher is a publisher of events for
type publisher struct {
	lk      sync.RWMutex
	closed  chan struct{}
	cmdChan chan cmd
}

// NewPublisher returns a new message event publisher
func NewPublisher() Publisher {
	ps := &publisher{
		cmdChan: make(chan cmd),
		closed:  make(chan struct{}),
	}
	return ps
}

func (ps *publisher) Startup() {
	go ps.start()
}

// Publish publishes an event for the given message id
func (ps *publisher) Publish(topic Topic, event Event) {
	ps.lk.RLock()
	defer ps.lk.RUnlock()
	select {
	case <-ps.closed:
		return
	default:
	}

	ps.cmdChan <- cmd{op: pub, topics: []Topic{topic}, msg: event}
}

// Shutdown shuts down all events and subscriptions
func (ps *publisher) Shutdown() {
	ps.lk.Lock()
	defer ps.lk.Unlock()
	select {
	case <-ps.closed:
		return
	default:
	}
	close(ps.closed)
	ps.cmdChan <- cmd{op: shutdown}
}

func (ps *publisher) Close(id Topic) {
	ps.lk.RLock()
	defer ps.lk.RUnlock()
	select {
	case <-ps.closed:
		return
	default:
	}
	ps.cmdChan <- cmd{op: closeTopic, topics: []Topic{id}}
}

func (ps *publisher) Subscribe(topic Topic, sub Subscriber) bool {
	ps.lk.RLock()
	defer ps.lk.RUnlock()

	select {
	case <-ps.closed:
		return false
	default:
	}

	ps.cmdChan <- cmd{op: subscribe, topics: []Topic{topic}, sub: sub}
	return true
}

func (ps *publisher) Unsubscribe(sub Subscriber) bool {
	ps.lk.RLock()
	defer ps.lk.RUnlock()

	select {
	case <-ps.closed:
		return false
	default:
	}

	ps.cmdChan <- cmd{op: unsubAll, sub: sub}
	return true
}

func (ps *publisher) start() {
	reg := subscriberRegistry{
		topics:    make(map[Topic]map[Subscriber]struct{}),
		revTopics: make(map[Subscriber]map[Topic]struct{}),
	}

loop:
	for cmd := range ps.cmdChan {
		if cmd.topics == nil {
			switch cmd.op {
			case unsubAll:
				reg.removeSubscriber(cmd.sub)

			case shutdown:
				break loop
			}

			continue loop
		}

		for _, topic := range cmd.topics {
			switch cmd.op {
			case subscribe:
				reg.add(topic, cmd.sub)

			case pub:
				reg.send(topic, cmd.msg)

			case closeTopic:
				reg.removeTopic(topic)
			}
		}
	}

	for topic, subs := range reg.topics {
		for sub := range subs {
			reg.remove(topic, sub)
		}
	}
}

type subscriberRegistry struct {
	topics    map[Topic]map[Subscriber]struct{}
	revTopics map[Subscriber]map[Topic]struct{}
}

func (reg *subscriberRegistry) add(topic Topic, sub Subscriber) {
	if reg.topics[topic] == nil {
		reg.topics[topic] = make(map[Subscriber]struct{})
	}
	reg.topics[topic][sub] = struct{}{}

	if reg.revTopics[sub] == nil {
		reg.revTopics[sub] = make(map[Topic]struct{})
	}
	reg.revTopics[sub][topic] = struct{}{}
}

func (reg *subscriberRegistry) send(topic Topic, msg Event) {
	for sub := range reg.topics[topic] {
		sub.OnNext(topic, msg)
	}
}

func (reg *subscriberRegistry) removeTopic(topic Topic) {
	for sub := range reg.topics[topic] {
		reg.remove(topic, sub)
	}
}

func (reg *subscriberRegistry) removeSubscriber(sub Subscriber) {
	for topic := range reg.revTopics[sub] {
		reg.remove(topic, sub)
	}
}

func (reg *subscriberRegistry) remove(topic Topic, sub Subscriber) {
	if _, ok := reg.topics[topic]; !ok {
		return
	}

	if _, ok := reg.topics[topic][sub]; !ok {
		return
	}

	delete(reg.topics[topic], sub)
	delete(reg.revTopics[sub], topic)

	if len(reg.topics[topic]) == 0 {
		delete(reg.topics, topic)
	}

	if len(reg.revTopics[sub]) == 0 {
		delete(reg.revTopics, sub)
	}

	sub.OnClose(topic)
}
